package cnki.kg.demo;

import cnki.kg.demo.util.KafkaUtils;
import cnki.kg.demo.util.R;
import com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
public class KafkaController {
    @Autowired
    private KafkaUtils kafkaUtils;
    @GetMapping("hello")
    public R hello(String name) {
        return R.success().put("data",String.format("hello,%s !",name));
    }
    /**
     * 新增topic (支持批量，这里就单个作为演示)
     *
     * @param topic topic
     * @return R
     */
    @PostMapping("kafka")
    public R add(String topic) {
        NewTopic newTopic = new NewTopic(topic, 3, (short) 1);
        kafkaUtils.createTopic(Lists.newArrayList(newTopic));
        return R.success();
    }
    @GetMapping("kafka/create")
    public R createTopic(String topic) {
        NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
        kafkaUtils.createTopic(Lists.newArrayList(newTopic));
        return R.success();
    }
    /**
     * 查询topic信息 (支持批量，这里就单个作为演示)
     *
     * @param topic 自增主键
     * @return R
     */
    @GetMapping("kafka/{topic}")
    public R getBytTopic(@PathVariable String topic) {
        return R.success().put("data",kafkaUtils.getTopicInfo(Lists.newArrayList(topic)));
    }

    /**
     * 删除topic (支持批量，这里就单个作为演示)
     * (注意：如果topic正在被监听会给人感觉删除不掉（但其实是删除掉后又会被创建）)
     *
     * @param topic topic
     * @return R
     */
    @DeleteMapping("kafka/{topic}")
    public R delete(@PathVariable String topic) {
        kafkaUtils.deleteTopic(Lists.newArrayList(topic));
        return R.success();
    }

    /**
     * 查询所有topic
     *
     * @return R
     */
    @GetMapping("kafka/allTopic")
    public R getAllTopic() {
        return R.success().put("data",kafkaUtils.getAllTopic());
    }

    /**
     * 生产者往topic中发送消息demo
     *
     * @param topic
     * @param message
     * @return
     */
    @GetMapping("kafka/message")
    public R sendMessage(String topic, String message) {
        kafkaUtils.sendMessage(topic, message);
        return R.success();
    }

    /**
     * 消费者示例demo
     * <p>
     * 基于注解监听多个topic，消费topic中消息
     * （注意：如果监听的topic不存在则会自动创建）
     */
    @KafkaListener(topics = {"tanc2","aaa"})
    public void consume(String message) {
        System.out.println("receive msg: " + message);
    }

    @KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}")
    //@KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "#{'${spring.kafka.topics}'.split(',')}")
    public void exportData(ConsumerRecord<?, ?> consumer, Acknowledgment ack) {
        String value = (String) consumer.value();
        System.out.println("receive msg: " + value);

        ack.acknowledge();
    }
}
